from typing import *
import pandas as pd
import numpy as np
from pykalman import KalmanFilter
import plotly.graph_objects as go
Data source: https://finance.yahoo.com/
Nokia Corporation (NOK): https://finance.yahoo.com/quote/NOK/history?p=NOK
data_dir = f'./data'
file_path = f'{data_dir}/NOK.csv'
price_col = 'Adj Close'
date_col = 'Date'
data = pd.read_csv(file_path)
data.info()
data[date_col] = pd.to_datetime(data[date_col])
data = data[data[date_col] >= '2000-01-01'].reset_index(drop=True)
data.head()
data.tail()
fig = go.Figure(
data=[
go.Scatter(
y=data[price_col],
x=data[date_col],
name='input_data'),
],
layout=go.Layout(
xaxis_title=date_col,
yaxis_title=price_col,
height=500, width=1000)
)
fig.show("notebook")
def exponential_smoothing(series: list, alpha: float) -> list:
result = [series[0]]
for n in range(1, len(series)):
result.append(alpha * series[n] + (1 - alpha) * result[n-1])
return result
alpha = 0.08
data['exponential_smoothing'] = exponential_smoothing(data[price_col].values, alpha)
window_size = 30
data['mean_rolling'] = data[price_col].rolling(window=window_size).mean()
kf = KalmanFilter(transition_matrices=1,
observation_matrices=1,
initial_state_mean=data[price_col].values[0],
initial_state_covariance=1,
observation_covariance=3,
transition_covariance=0.05)
data['kalman_smoothing'] = kf.smooth(data[price_col].values)[0]
fig = fig.add_trace(
go.Scatter(
y=data['kalman_smoothing'],
x=data[date_col],
name='kalman_smoothing'))
fig = fig.add_trace(
go.Scatter(
y=data['mean_rolling'],
x=data[date_col],
name='mean_rolling'))
fig = fig.add_trace(
go.Scatter(
y=data['exponential_smoothing'],
x=data[date_col],
name='exponential_smoothing'))
fig.show("notebook")
def find_intervals(series, epsilon=0.019) -> List[int]:
derivative = np.diff(data[smooth_col])
derivative = np.concatenate((derivative, (np.nan,))) # unknown derivative value for last day
stationary_points = np.zeros(derivative.shape)
stationary_points[(derivative > -epsilon) & (derivative < epsilon)] = 1
print(f"Stationary poings number = "\
f"{stationary_points[stationary_points == 1].shape[0]}")
intervals = []
derivative_sum = 0 # to check sign on interval
count = 0
for i, deriv in enumerate(derivative):
count += 1
# skip unknown last day derivative
if not np.isnan(deriv):
derivative_sum += deriv
if stationary_points[i] == 1 or i == (len(derivative) - 1):
# determine the sign of the derivative on the interval
sign = np.sign(derivative_sum)
# fill interval with 1 (increasing) or -1 (decreasing) values
intervals += [sign for j in range(count)]
count = 0
derivative_sum = 0
return intervals
smooth_col = 'kalman_smoothing'
intervals = find_intervals(data[smooth_col].values)
fig2 = go.Figure(
data=[
go.Scatter(
y=data[smooth_col],
x=data[date_col],
name='smoothed_data'),
go.Scatter(
y=intervals,
x=data[date_col],
name='grow_intervals'),
],
layout=go.Layout(
xaxis_title=date_col,
yaxis_title=price_col,
height=500, width=1000)
)
fig2.show("notebook")
%%time
kf = KalmanFilter(transition_matrices=1,
observation_matrices=1,
initial_state_mean=data[price_col].values[0],
initial_state_covariance=1,
observation_covariance=3,
transition_covariance=0.05)
series = data[price_col].values
window_size = 30
series_smoothed_by_window = np.full(series.shape, np.nan)
n_iter = series.shape[0] - window_size + 1
for i in range(n_iter):
start_timestamp = i
current_timestamp = i + window_size - 1
window = series[start_timestamp: current_timestamp + 1]
# set initial kalman state for current window
kf.initial_state_mean = window[0]
kf.initial_state_covariance = 3
window_smooth = kf.filter(window)[0].squeeze()
current_smooth_value = window_smooth[-1]
series_smoothed_by_window[current_timestamp] = current_smooth_value
realtime_col = 'realtime_filter'
data[realtime_col] = series_smoothed_by_window
fig3 = go.Figure(
data=[
go.Scatter(
y=data[price_col],
x=data[date_col],
name='source_data'),
go.Scatter(
y=data[smooth_col],
x=data[date_col],
name=smooth_col),
go.Scatter(
y=data[realtime_col],
x=data[date_col],
name=f"{realtime_col} (kalman)"),
],
layout=go.Layout(
xaxis_title=date_col,
yaxis_title=price_col,
height=500, width=1000)
)
fig3.show("notebook")
n_days = 30
data_tail = data[-n_days:]
fig4 = go.Figure(
data=[
go.Scatter(
y=data_tail[price_col],
x=data_tail[date_col],
name='source_data'),
go.Scatter(
y=data_tail[smooth_col],
x=data_tail[date_col],
name='smoothed_data'),
go.Scatter(
y=data_tail[realtime_col],
x=data_tail[date_col],
name=f"{realtime_col} (kalman)"),
],
layout=go.Layout(
xaxis_title=date_col,
yaxis_title=price_col,
height=500, width=1000)
)
fig4.show("notebook")
As we can see on the plot real-time filtering with rolling window of 30 days returns smoothed value with some lag.
df = data_tail
trace1 = go.Scatter(x=df[date_col][:2],
y=df[price_col][:2],
mode='lines',
name='Source data',
line=go.scatter.Line(width=2))
trace2 = go.Scatter(x=df[date_col][:2],
y=df[realtime_col][:2],
mode='lines',
name='Filtered data',
line=go.scatter.Line(width=2))
frames = [
dict(
data=[
dict(
type='scatter',
x=df[date_col][:k + 1],
y=df[price_col][:k + 1]),
dict(
type='scatter',
x=df[date_col][:k + 1],
y=df[realtime_col][:k + 1]),
],
traces=[0, 1],
) for k in range(1, len(df) - 1)
]
layout = go.Layout(
width=1000,
height=500,
showlegend=True,
hovermode='x unified',
updatemenus=[
dict(
type='buttons',
showactive=False,
y=0.05,
x=1.05,
xanchor='left',
yanchor='top',
pad=dict(t=0, r=10),
buttons=[
dict(
label='Play animation',
method='animate',
args=[
None,
dict(
frame=dict(
duration=150,
redraw=True),
transition=dict(duration=0),
fromcurrent=True,
mode='immediate')
]
)
]
)
])
layout.update(xaxis=dict(range=[df[date_col].min(), df[date_col].max()], autorange=False),
yaxis=dict(range=[df[price_col].min(), df[price_col].max()], autorange=False));
fig5 = go.Figure(data=[trace1, trace2], frames=frames, layout=layout)
fig5.show("notebook")